package net.p2pcdn.core.order.service;

import net.p2pcdn.common.Date2StringConverter;
import net.p2pcdn.common.TrafficUnitUtils;
import net.p2pcdn.core.order.domain.CDNConsumeRecord;
import net.p2pcdn.core.order.domain.TrafficUpdateRecord;
import net.p2pcdn.core.order.repository.CDNConsumeRecordRepository;
import net.p2pcdn.core.order.repository.CDNDomainRepository;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;

/**
 * @author zx
 */
@Service
public class CDNConsumeRecordService {
    private final static Logger LOGGER = LoggerFactory.getLogger(CDNConsumeRecordService.class);
    @Resource
    private CDNConsumeRecordRepository cdnConsumeRecordRepository;
    @Resource
    private CDNDomainRepository cdnDomainRepository;
    @Autowired
    private ThreadPoolTaskExecutor springThreadPoolTaskExecutor;
    @Autowired
    private ContractService contractService;
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    private String getBatchNo() {
        String date = Date2StringConverter.yyyyMMdd(new Date()).replaceAll("-", "");
        String batchNoKey = "P2P_batch_no";
        Long value = redisTemplate.boundValueOps(batchNoKey).increment();
        LOGGER.info("====================================1===================================");
        LOGGER.info(value.toString());
        LOGGER.info("=====================================1==================================");
        String result = StringUtils.leftPad(value.toString(), 6, "0");
        return date + result;
    }

    public void generateIDs(List<CDNConsumeRecord> records) {
        Instant from = Instant.now();
        String prefix = this.getBatchNo();
        int index = 1;
        for (CDNConsumeRecord record : records) {
            record.setId(prefix + StringUtils.leftPad(index + "", 6, "0"));
            LOGGER.info(record.toString());
            index++;
        }
        Instant to = Instant.now();
        LOGGER.debug("=============================2====generateIDs===================================");
        LOGGER.debug(Duration.between(from, to).getSeconds() + "");
        LOGGER.debug("=============================2====generateIDs===================================");
    }

    @Transactional(rollbackFor = Exception.class)
    public void saveCDNConsumeRecord(List<CDNConsumeRecord> records) {
        Instant from = Instant.now();
        generateIDs(records);
        cdnConsumeRecordRepository.batchInsert(records);
        DataMergeBO bo = mergeTrafficAmount(records);
        bo.getDomainRelatedTraffic().forEach((k, v) -> springThreadPoolTaskExecutor.submit(new Worker(new Task(k, new BigDecimal(v), bo.getDomainRelatedRecordIds().get(k)), this)));
        Instant to = Instant.now();
        LOGGER.debug("==========================3=======saveCDNConsumeRecord===================================");
        LOGGER.debug(Duration.between(from, to).getSeconds() + "");
        LOGGER.debug("===========================3======saveCDNConsumeRecord===================================");
    }

    public DataMergeBO mergeTrafficAmount(List<CDNConsumeRecord> records) {
        Instant from = Instant.now();
        Map<String, List<String>> ids = new HashMap<>(4);
        records.forEach(item -> {
            if (ids.containsKey(item.getDomain())) {
                ids.get(item.getDomain()).add(item.getId());
            } else {
                List<String> idsList = new LinkedList<>();
                idsList.add(item.getId());
                ids.put(item.getDomain(), idsList);
            }
        });
        Map<String, Double> trafficMap = records.parallelStream().collect(Collectors.groupingBy(CDNConsumeRecord::getDomain, Collectors.summingDouble(CDNConsumeRecord::getP2p_downloaded)));
        Instant to = Instant.now();
        LOGGER.debug("=============================4====mergeTrafficAmount===================================");
        LOGGER.debug(Duration.between(from, to).getSeconds() + "");
        LOGGER.debug("===============================4==mergeTrafficAmount===================================");
        return new DataMergeBO(ids, trafficMap);
    }

    public static class Worker implements Runnable {
        private final Task task;
        private final CDNConsumeRecordService service;

        public Task getTask() {
            return task;
        }

        @Override
        public void run() {
            LOGGER.info(task.toString());
            service.batchHandle(task);
        }

        public Worker(Task task, CDNConsumeRecordService service) {
            this.task = task;
            this.service = service;
        }
    }

    public List<String> getEffectiveDomains() {
        return cdnDomainRepository.findEffectiveDomains();
    }

    public void batchHandle(Task task) {
        Instant from = Instant.now();
        Integer contractId = contractService.findContractIdByDomain(task.getDomain());
        LOGGER.debug("============================contractId {}=====batchHandle===================================",contractId == null ? "0":contractId);
        if(contractId != null){
            contractService.consumeTraffic(-1, contractId, TrafficUnitUtils.toGB(task.getTraffic()), TrafficUpdateRecord.ChangeType.CONSUME);
            cdnConsumeRecordRepository.done(task.getRecordIds(), contractId);
        }
        Instant to = Instant.now();
        LOGGER.debug("============================5=====batchHandle===================================");
        LOGGER.debug(Duration.between(from, to).getSeconds() + "");
        LOGGER.debug("============================5=====batchHandle===================================");
    }

    public P2PTrafficHitRateVO getP2pTrafficHitRate(int userId, String from, String to) throws ParseException {
        Map<String,Object> params = new HashMap<>(8);
        params.put("userId",userId);
        if(StringUtils.isNotBlank(from)){
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            params.put("from",sdf.parse(from+" 00:00:00"));
        }else {
            params.put("from",Date2StringConverter.getDateOfStartTime());
        }
        if(StringUtils.isNotBlank(to)){
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            params.put("to",sdf.parse(from+" 23:59:00"));
        }else {
            params.put("to",Date2StringConverter.getDateOfEndTime());
        }
        return new P2PTrafficHitRateVO(cdnConsumeRecordRepository.calcP2PTrafficRate(params));
    }
}
